{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# <font color='green'> ALS Recommender - Matrix Factorization Algorithm <font>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Import common libraries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import numpy as np"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## <font color='green'> 1. Data Preparation<font>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Download and unzip data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Please uncomment the below lines to download and unzip the dataset.\n",
    "# !wget -N http://files.grouplens.org/datasets/movielens/ml-10m.zip\n",
    "# !unzip -o ml-10m.zip\n",
    "# !mv ml-10m/ratings.dat datasets/ratings_10M.dat"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read the data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "shape of loaded dataframe: (10000054, 3)\n"
     ]
    }
   ],
   "source": [
    "import pandas as pd\n",
    "df = pd.read_csv(\"./datasets/ratings_10M.dat\",\n",
    "                 sep=\"::\",\n",
    "                 engine=\"python\",\n",
    "                 usecols=[0,1,2],\n",
    "                 names=[\"userId\", \"movieId\", \"rating\"])\n",
    "print(\"shape of loaded dataframe: {}\".format(df.shape))\n",
    "\n",
    "# adjusting the dataframe to transform zero-based userId and movieId\n",
    "userId = df['userId'].values\n",
    "movieId = df['movieId'].values\n",
    "UID, unqUID = np.unique(userId, return_inverse=True)\n",
    "MID, unqMID = np.unique(movieId, return_inverse=True)\n",
    "df['userId'] = unqUID\n",
    "df['movieId'] = unqMID"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Check Data Types"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "userId       int64\n",
       "movieId      int64\n",
       "rating     float64\n",
       "dtype: object"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.dtypes"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Check total number of users"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "69878"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "all_users = df.userId.unique().tolist()\n",
    "len(all_users)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Collect test data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "import random\n",
    "test_ids = random.sample(all_users, 100)\n",
    "\n",
    "all_actual_items = []\n",
    "for user in test_ids:\n",
    "    user_actual_items = df.loc[(df[\"userId\"]==user) & (df[\"rating\"]>=3), \"movieId\"].tolist()\n",
    "    all_actual_items.append(user_actual_items)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Convert the data to COO and CSR format"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "shape of the input matrix is (69878, 10677)\n"
     ]
    }
   ],
   "source": [
    "userId = df['userId'].values\n",
    "movieId = df['movieId'].values\n",
    "rating = df['rating'].values\n",
    "\n",
    "from scipy.sparse import coo_matrix\n",
    "mat = coo_matrix((rating, (userId, movieId)), dtype=np.float64)\n",
    "mat_csr = mat.tocsr()\n",
    "print (\"shape of the input matrix is {}\".format(mat.shape))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Helper Function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_recommendations(handle, algo, n):\n",
    "    all_recommended_items = []\n",
    "    for user in test_ids:\n",
    "        user_recommended_items = []\n",
    "        for i in handle(user, n):\n",
    "            if algo==\"frov\":\n",
    "                item = i[0]\n",
    "            elif algo==\"spark\":\n",
    "                item = i.product\n",
    "            user_recommended_items.append(item)\n",
    "        all_recommended_items.append(user_recommended_items)\n",
    "    return all_recommended_items"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## <font color='green'> 2. Frovedis ALS<font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Frovedis ALS Train Time: 366.292 sec\n",
      "Frovedis ALS Recommendation Time: 0.947 sec\n",
      "Frovedis ALS MAP(10) Score: 0.882 \n"
     ]
    }
   ],
   "source": [
    "# initializing frovedis server\n",
    "import os\n",
    "from frovedis.exrpc.server import FrovedisServer\n",
    "FrovedisServer.initialize(\"mpirun -np 8 \" + os.environ[\"FROVEDIS_SERVER\"])\n",
    "\n",
    "# fitting the input matrix on frovedis ALS object\n",
    "from frovedis.mllib.recommendation import ALS as frovALS\n",
    "start_time = time.time()\n",
    "als = frovALS(rank=256, max_iter=15, reg_param=0.01).fit(mat_csr)\n",
    "frov_train_time = time.time() - start_time\n",
    "print(\"Frovedis ALS Train Time: %.3f sec\" % frov_train_time)\n",
    "\n",
    "# recommend 10 items for all test user\n",
    "import ml_metrics\n",
    "start_time = time.time()\n",
    "all_recommended_items = get_recommendations(als.recommend_products, \"frov\", 10)\n",
    "frov_pred_time = time.time() - start_time\n",
    "print(\"Frovedis ALS Recommendation Time: %.3f sec\" % frov_pred_time)\n",
    "\n",
    "# generate mapk (Mean Average Precision at k) score\n",
    "frov_score = ml_metrics.mapk(all_actual_items, all_recommended_items, 10)\n",
    "print(\"Frovedis ALS MAP(10) Score: %.3f \" % frov_score)\n",
    "\n",
    "# clean-up\n",
    "als.release()\n",
    "FrovedisServer.shut_down()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## <font color='green'> 3. Pyspark ALS<font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Pyspark ALS Train Time: 917.875 sec\n",
      "Pyspark ALS Recommendation Time: 10.233 sec\n",
      "Pyspark ALS MAP(10) Score: 0.892\n"
     ]
    }
   ],
   "source": [
    "# initializing spark server\n",
    "from pyspark import SparkConf, SparkContext\n",
    "conf = SparkConf().set(\"spark.driver.memory\", \"15g\")\\\n",
    "                  .set(\"spark.executor.memory\", \"48g\")\n",
    "sc = SparkContext(master=\"local[12]\", appName=\"als\", conf=conf)\n",
    "\n",
    "# creating pyspark data for ALS train\n",
    "ratingsRDD = sc.parallelize(zip(mat.row, mat.col, mat.data))\n",
    "\n",
    "# fitting the input matrix on pyspark ALS object\n",
    "from pyspark.mllib.recommendation import ALS as pyspark_als\n",
    "start_time = time.time()\n",
    "model = pyspark_als.trainImplicit(ratingsRDD, rank=256, iterations=15, lambda_=0.01)\n",
    "sp_train_time = time.time() - start_time\n",
    "print(\"Pyspark ALS Train Time: %.3f sec\" % sp_train_time)\n",
    "\n",
    "# recommend 10 items for all test user\n",
    "import ml_metrics\n",
    "start_time = time.time()\n",
    "all_recommended_items = get_recommendations(model.recommendProducts, \"spark\", 10)\n",
    "sp_pred_time = time.time() - start_time\n",
    "print(\"Pyspark ALS Recommendation Time: %.3f sec\" % sp_pred_time)\n",
    "\n",
    "# generate mapk (Mean Average Precision at k) score\n",
    "sp_score = ml_metrics.mapk(all_actual_items, all_recommended_items, 10)\n",
    "print(\"Pyspark ALS MAP(10) Score: %.3f\" % sp_score)\n",
    "\n",
    "# clean-up\n",
    "sc.stop()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## <font color='green'> 4. Performance Comparison <font> "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "evaluation of ALS using frovedis_0.9.10 vs pyspark_3.0.1: \n",
      "frovedis train speed-up: 2.506\n",
      "frovedis recommendation speed-up: 10.807\n"
     ]
    }
   ],
   "source": [
    "import frovedis\n",
    "import pyspark\n",
    "print(\"evaluation of ALS using frovedis_%s vs pyspark_%s: \" \\\n",
    "      % (frovedis.__version__, pyspark.__version__))\n",
    "print(\"frovedis train speed-up: %.3f\" % (sp_train_time / frov_train_time))\n",
    "print(\"frovedis recommendation speed-up: %.3f\" % (sp_pred_time / frov_pred_time))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
